Skip to content

Harden data races between threaded input workers and the main engine#12007

Open
erain wants to merge 4 commits into
fluent:masterfrom
erain:upstream-pr/threaded-input-data-races
Open

Harden data races between threaded input workers and the main engine#12007
erain wants to merge 4 commits into
fluent:masterfrom
erain:upstream-pr/threaded-input-data-races

Conversation

@erain

@erain erain commented Jun 26, 2026

Copy link
Copy Markdown
Contributor

Summary

Three data races between threaded input worker threads and the main
engine thread
, all surfaced by ThreadSanitizer while running a threaded
tail input (with multiline) alongside the metrics exporter. They are
undefined under the C memory model and benign on the supported hardware, but
worth closing:

  • metricsflb_metrics_sum() updates a counter from the owning
    input/output worker thread while the metrics exporter reads the same counter
    from the main thread. This one is live whenever metrics are scraped together
    with a threaded input.
  • inputflb_input_collector_fd() runs on the main thread but iterated a
    threaded input's collectors, racing the worker thread that initializes those
    collector fds at startup. A threaded input's collectors are registered and
    dispatched in the input's own thread/event loop and are never matched here,
    so the handler now skips threaded inputs (also a small optimization).
  • engine / binconfig->grace_input is published by the engine thread
    during startup and read by the supervisor on the main thread.

A small include/fluent-bit/flb_atomic.h helper is added (relaxed GCC/Clang
__atomic builtins, plain-access fallback) and used for the metric counter and
grace_input.

Note: these are not the ARM64 SIGSEGV in
flb_input_chunk_ring_buffer_collector. That one is a separate unsynchronized
SPSC hand-off that is already serialized by the flb_ring_buffer mutex on
master. These changes were found with the same ThreadSanitizer setup while
investigating threaded-mode data races (cf. #9835).


Testing

Built with ThreadSanitizer (-fsanitize=thread, jemalloc disabled) and run for
~45s against a threaded tail + multiline.parser cri + null pipeline driven
by a CRI log generator (~6M lines, 6 files):

  • Before: TSan reports exactly these 3 races (flb_metrics_sum,
    flb_input_collector_fd, grace_input).
  • After: 0 ThreadSanitizer reports, clean shutdown, no crash.

Example configuration used:

[SERVICE]
    flush     0.2
    log_level info

[INPUT]
    name              tail
    path              /tmp/repro/logs/*.log
    read_from_head    true
    threaded          on
    multiline.parser  cri
    tag               kube.*

[OUTPUT]
    name   null
    match  *
  • Example configuration file for the change (above)
  • Debug log output from testing the change (ThreadSanitizer before/after summarized above; full logs available on request)
  • [N/A] Valgrind output — ThreadSanitizer is the appropriate tool for these data races; memcheck does not detect them. TSan before/after provided instead.

Documentation

  • [N/A] Documentation required for this feature (no user-facing/behavioral change)

Backporting

  • Backport to latest stable release (safe, self-contained; at maintainers' discretion)

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • Bug Fixes

    • Improved thread-safe handling for shared runtime values to reduce race conditions.
    • Metric counters and printed/exported values now use safer concurrent reads and updates.
    • Collector discovery now skips threaded collectors during initialization to avoid unstable access.
    • Supervisor state updates now use a consistent snapshot of shared grace-period values.
  • New Features

    • Added a shared atomic utility interface for safer cross-thread scalar operations.

erain and others added 4 commits June 26, 2026 17:13
flb_metrics_sum() updates a counter from the owning input or output
worker thread while the metrics exporter reads the same counter from
the main engine thread. With a threaded input this is a data race
reported by ThreadSanitizer; it is benign on the supported hardware
but undefined under the C memory model.

Add a small flb_atomic.h helper with relaxed load/store/fetch_add and
use it for the metric value on both the summing and the reading paths.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Yu Yi <yiyu@google.com>
flb_input_collector_fd() runs on the main engine thread but iterated
the collectors of every input, including threaded ones. A threaded
input initializes its collector descriptors from its own worker
thread, so this races with the main thread reading them at startup.

Those collectors are registered and dispatched in the input's own
thread and event loop, so they are never matched here. Skipping
threaded inputs removes the race and avoids needless iteration.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Yu Yi <yiyu@google.com>
config->grace_input is written once by the engine thread during
startup and read concurrently by the supervisor on the main thread,
which ThreadSanitizer reports as a data race.

Store it with a relaxed atomic; the matching atomic read is done at
the supervisor entry point.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Yu Yi <yiyu@google.com>
The supervisor entry point reads config->grace_input, which the engine
thread publishes during startup. Read it with a relaxed atomic so the
cross-thread access is well defined, matching the engine-side store.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Yu Yi <yiyu@google.com>
@coderabbitai

coderabbitai Bot commented Jun 26, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

Adds relaxed atomic helpers and updates shared scalar reads and writes in engine, metrics, input, and supervisor paths.

Changes

Shared state atomics

Layer / File(s) Summary
Atomic helper header
include/fluent-bit/flb_atomic.h
Defines relaxed load, store, and fetch-add macros with compiler builtins and a plain-memory fallback.
Grace input publication
src/flb_engine.c, src/fluent-bit.c
Stores config->grace_input atomically in the engine and reads the published value atomically in the supervisor update path.
Metric counter atomics
src/flb_metrics.c
Uses atomic fetch-add for accumulation and atomic loads when printing and dumping metric values.
Threaded collector skip
src/flb_input.c
Skips threaded input instances in flb_input_collector_fd() during the collector scan for the main-thread fd path.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Suggested reviewers

  • cosmo0920

Poem

Hop hop, I found a safer way to share,
relaxed atoms glinting in the air.
Metrics count, and grace hops cleanly by,
while threaded fds drift past my bunny eye.
Thump thump, concurrency’s carrot feels fair.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 14.29% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly summarizes the main change: fixing cross-thread data races between threaded inputs and the main engine.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@include/fluent-bit/flb_atomic.h`:
- Around line 44-53: The fallback in flb_atomic.h currently maps
flb_atomic_load, flb_atomic_store, and flb_atomic_fetch_add to plain accesses,
which is unsafe for shared cross-thread state. Update the `#else` branch to use a
real atomic implementation for non-GCC/Clang compilers, or fail the build with
an explicit error if no atomic backend is available. Keep the fix scoped to the
flb_atomic_* macros so all existing call sites get proper atomic semantics.

In `@src/flb_engine.c`:
- Around line 1138-1139: The grace window update in flb_engine_started setup is
happening too late, so the main thread can observe a stale grace_input value
after FLB_ENGINE_STARTED is published. Move the flb_atomic_store for
config->grace_input to occur before calling flb_engine_started(config), keeping
the update in the startup path around the existing grace_input logic so
src/fluent-bit.c reads the correct supervisor grace window.

In `@src/fluent-bit.c`:
- Around line 1477-1479: The supervisor grace publication is only using an
atomic load in the fixed path, but the later hot-reload paths in the same
function still read ctx->config->grace_input directly and can reintroduce the
race. Update those additional grace publication sites in the function that
handles hot reloads to use flb_atomic_load on ctx->config->grace_input before
calling flb_supervisor_child_update_grace, keeping the access pattern consistent
everywhere in this flow.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: bb82bdf9-9dc4-4eb0-ab1c-9e06c68eaf41

📥 Commits

Reviewing files that changed from the base of the PR and between f317143 and 144901f.

📒 Files selected for processing (5)
  • include/fluent-bit/flb_atomic.h
  • src/flb_engine.c
  • src/flb_input.c
  • src/flb_metrics.c
  • src/fluent-bit.c

Comment on lines +44 to +53
#else

/*
* Fallback for compilers without the GCC/Clang atomic builtins. The accesses
* stay plain; on the hardware these compilers target an aligned word access is
* atomic, and the races guarded here are benign, so correctness is preserved.
*/
#define flb_atomic_load(ptr) (*(ptr))
#define flb_atomic_store(ptr, val) (*(ptr) = (val))
#define flb_atomic_fetch_add(ptr, v) (*(ptr) += (v))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -eu

# Verify whether this repository supports non-GCC/non-Clang C compilers and
# whether a platform-specific atomic backend is needed instead of a plain fallback.
{
  fd 'CMakeLists\.txt$|.*\.cmake$' .
  fd -e h -e c -e cpp .
} | xargs -r rg -n -C2 \
  'MSVC|_MSC_VER|CMAKE_C_COMPILER_ID|stdatomic|Interlocked|__atomic|ThreadSanitizer|fsanitize=thread'

Repository: fluent/fluent-bit

Length of output: 50374


🏁 Script executed:

#!/bin/bash
set -eu

# Inspect the atomic helper header, its call sites, and any platform/compiler
# gating around atomic support.
files=$(git ls-files \
  'include/fluent-bit/flb_atomic.h' \
  'include/**/*.h' \
  'src/**/*.c' \
  'CMakeLists.txt' \
  'cmake/**/*.cmake' \
  'lib/**/*.cmake' \
  2>/dev/null || true)

printf '%s\n' "$files" | rg -n 'flb_atomic|_MSC_VER|__atomic|stdatomic|Interlocked|atomic' -n -C 2 --no-heading || true

echo '--- HEADER ---'
sed -n '1,140p' include/fluent-bit/flb_atomic.h

echo '--- CALL SITES ---'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src lib || true

echo '--- BUILD/GATING ---'
rg -n -C 2 '_MSC_VER|__clang__|__GNUC__|stdatomic|__atomic|Interlocked|CMAKE_C_COMPILER_ID|atomic' CMakeLists.txt cmake include src lib || true

Repository: fluent/fluent-bit

Length of output: 50374


🏁 Script executed:

#!/bin/bash
set -eu

echo '--- FILE ---'
sed -n '1,160p' include/fluent-bit/flb_atomic.h

echo '--- USAGES ---'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src lib

echo '--- WINDOWS/COMPILER SUPPORT ---'
rg -n -C 2 '_MSC_VER|__clang__|__GNUC__|stdatomic|__atomic|Interlocked|CMAKE_C_COMPILER_ID' CMakeLists.txt cmake include src lib

Repository: fluent/fluent-bit

Length of output: 50374


🏁 Script executed:

#!/bin/bash
set -eu

echo '--- header ---'
sed -n '1,120p' include/fluent-bit/flb_atomic.h

echo '--- direct usages ---'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src lib

echo '--- compiler support / fallbacks ---'
rg -n -C 2 '_MSC_VER|__clang__|__GNUC__|stdatomic|__atomic|Interlocked|CMAKE_C_COMPILER_ID|atomic' CMakeLists.txt cmake include src lib

Repository: fluent/fluent-bit

Length of output: 50374


🏁 Script executed:

#!/bin/bash
set -eu

echo '--- include/fluent-bit/flb_atomic.h ---'
sed -n '1,120p' include/fluent-bit/flb_atomic.h

echo '--- flb_atomic usages ---'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src lib

echo '--- compiler/build gating near fluent-bit atomics ---'
rg -n -C 2 '_MSC_VER|__clang__|__GNUC__|__atomic|stdatomic|Interlocked|atomic' CMakeLists.txt cmake include src lib | rg -n 'flb_atomic|atomic' -C 1

Repository: fluent/fluent-bit

Length of output: 50374


🏁 Script executed:

#!/bin/bash
set -eu

echo '--- include/fluent-bit/flb_atomic.h ---'
sed -n '1,120p' include/fluent-bit/flb_atomic.h

echo '--- flb_atomic usages ---'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src lib

echo '--- compiler/build gating near fluent-bit atomics ---'
rg -n -C 2 '_MSC_VER|__clang__|__GNUC__|__atomic|stdatomic|Interlocked|atomic' CMakeLists.txt cmake include src lib | rg -n 'flb_atomic|atomic' -C 1

Repository: fluent/fluent-bit

Length of output: 50374


🏁 Script executed:

#!/bin/bash
set -eu

echo 'HEADER:'
sed -n '1,120p' include/fluent-bit/flb_atomic.h

echo
echo 'USAGES:'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src | head -n 200

echo
echo 'BUILD FILES:'
rg -n -C 2 'flb_atomic|__atomic|stdatomic|_MSC_VER|CMAKE_C_COMPILER_ID' CMakeLists.txt cmake include src | head -n 200

Repository: fluent/fluent-bit

Length of output: 16002


🏁 Script executed:

#!/bin/bash
set -eu

# Focus only on fluent-bit atomics to determine whether the plain fallback
# is actually used on shared, multi-threaded state and whether any non-GCC/
# non-Clang backend exists for this header.
echo '--- include/fluent-bit/flb_atomic.h ---'
sed -n '1,120p' include/fluent-bit/flb_atomic.h

echo
echo '--- flb_atomic call sites ---'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src

echo
echo '--- fluent-bit build/compiler checks ---'
rg -n -C 2 'flb_atomic|__atomic|stdatomic|_MSC_VER|CMAKE_C_COMPILER_ID|MSVC|Interlocked' CMakeLists.txt cmake include src | head -n 200

Repository: fluent/fluent-bit

Length of output: 15416


Replace the plain-access fallback with a real atomic backend or a build error. flb_atomic_load/store/fetch_add() are used on shared fields across threads, so the #else branch reintroduces a data race and makes fetch_add a non-atomic RMW on non-GCC/Clang compilers such as MSVC.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@include/fluent-bit/flb_atomic.h` around lines 44 - 53, The fallback in
flb_atomic.h currently maps flb_atomic_load, flb_atomic_store, and
flb_atomic_fetch_add to plain accesses, which is unsafe for shared cross-thread
state. Update the `#else` branch to use a real atomic implementation for
non-GCC/Clang compilers, or fail the build with an explicit error if no atomic
backend is available. Keep the fix scoped to the flb_atomic_* macros so all
existing call sites get proper atomic semantics.

Comment thread src/flb_engine.c
Comment on lines +1138 to +1139
/* published to the supervisor/main thread, which reads it concurrently */
flb_atomic_store(&config->grace_input, config->grace / 2);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -eu

# Inspect whether flb_engine_started() is the synchronization point that allows
# flb_start() / the main thread to continue.
rg -n -C5 '\bflb_engine_started\s*\(|\bflb_start\s*\(' src include

Repository: fluent/fluent-bit

Length of output: 5072


🏁 Script executed:

#!/bin/bash
set -eu

# Inspect the startup handshake and the supervisor-side read of grace_input.
sed -n '760,820p' src/flb_engine.c
printf '\n---\n'
sed -n '1,120p' src/flb_lib.c
printf '\n---\n'
sed -n '1458,1485p' src/fluent-bit.c

Repository: fluent/fluent-bit

Length of output: 5961


🏁 Script executed:

#!/bin/bash
set -eu

# Inspect the startup handshake and the supervisor-side read of grace_input.
sed -n '760,820p' src/flb_engine.c
printf '\n---\n'
sed -n '1,120p' src/flb_lib.c
printf '\n---\n'
sed -n '1458,1485p' src/fluent-bit.c

Repository: fluent/fluent-bit

Length of output: 5961


🏁 Script executed:

#!/bin/bash
set -eu

# Inspect do_start() and the library-side wait for the engine-start notification.
sed -n '880,1045p' src/flb_lib.c
printf '\n---\n'
sed -n '1045,1125p' src/flb_lib.c
printf '\n---\n'
rg -n -C4 'ENGINE_STARTED|FLB_ENGINE_STARTED|ch_notif|flb_pipe_r|flb_pipe_w|started' src/flb_lib.c src/flb_engine.c src/fluent-bit.c

Repository: fluent/fluent-bit

Length of output: 13207


🏁 Script executed:

#!/bin/bash
set -eu

# Inspect do_start() and the library-side wait for the engine-start notification.
sed -n '880,1045p' src/flb_lib.c
printf '\n---\n'
sed -n '1045,1125p' src/flb_lib.c
printf '\n---\n'
rg -n -C4 'ENGINE_STARTED|FLB_ENGINE_STARTED|ch_notif|flb_pipe_r|flb_pipe_w|started' src/flb_lib.c src/flb_engine.c src/fluent-bit.c

Repository: fluent/fluent-bit

Length of output: 13207


Move the grace_input store before the startup signal. flb_start() returns only after the main thread reads FLB_ENGINE_STARTED, so src/fluent-bit.c can load a stale grace_input and publish the wrong supervisor grace window. Store it before flb_engine_started(config).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/flb_engine.c` around lines 1138 - 1139, The grace window update in
flb_engine_started setup is happening too late, so the main thread can observe a
stale grace_input value after FLB_ENGINE_STARTED is published. Move the
flb_atomic_store for config->grace_input to occur before calling
flb_engine_started(config), keeping the update in the startup path around the
existing grace_input logic so src/fluent-bit.c reads the correct supervisor
grace window.

Comment thread src/fluent-bit.c
Comment on lines +1477 to +1479
/* grace_input is published by the engine thread (flb_engine_start) */
flb_supervisor_child_update_grace(ctx->config->grace,
ctx->config->grace_input);
flb_atomic_load(&ctx->config->grace_input));

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

Use atomic loads for every supervisor grace publication.

This path is fixed, but the hot-reload paths later in this function still read ctx->config->grace_input directly at Lines 1518-1531. Those paths can reintroduce the same race when the engine thread publishes the value.

Suggested consistency fix
@@
     struct flb_cf_section *section;
     struct flb_cf *cf_opts;
     struct flb_cf_group *group = NULL;
     int supervisor_reload_notified = FLB_FALSE;
+    int grace_input;
@@
     if (ctx != NULL && ctx->config != NULL) {
         /* grace_input is published by the engine thread (flb_engine_start) */
-        flb_supervisor_child_update_grace(ctx->config->grace,
-                                          flb_atomic_load(&ctx->config->grace_input));
+        grace_input = flb_atomic_load(&ctx->config->grace_input);
+        flb_supervisor_child_update_grace(ctx->config->grace, grace_input);
@@
-                flb_supervisor_child_signal_shutdown(ctx->config->grace,
-                                                     ctx->config->grace_input);
+                grace_input = flb_atomic_load(&ctx->config->grace_input);
+                flb_supervisor_child_signal_shutdown(ctx->config->grace, grace_input);
@@
-                    flb_supervisor_child_update_grace(ctx->config->grace,
-                                                      ctx->config->grace_input);
+                    grace_input = flb_atomic_load(&ctx->config->grace_input);
+                    flb_supervisor_child_update_grace(ctx->config->grace, grace_input);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/* grace_input is published by the engine thread (flb_engine_start) */
flb_supervisor_child_update_grace(ctx->config->grace,
ctx->config->grace_input);
flb_atomic_load(&ctx->config->grace_input));
struct flb_cf_section *section;
struct flb_cf *cf_opts;
struct flb_cf_group *group = NULL;
int supervisor_reload_notified = FLB_FALSE;
int grace_input;
...
/* grace_input is published by the engine thread (flb_engine_start) */
grace_input = flb_atomic_load(&ctx->config->grace_input);
flb_supervisor_child_update_grace(ctx->config->grace, grace_input);
...
grace_input = flb_atomic_load(&ctx->config->grace_input);
flb_supervisor_child_signal_shutdown(ctx->config->grace, grace_input);
...
grace_input = flb_atomic_load(&ctx->config->grace_input);
flb_supervisor_child_update_grace(ctx->config->grace, grace_input);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/fluent-bit.c` around lines 1477 - 1479, The supervisor grace publication
is only using an atomic load in the fixed path, but the later hot-reload paths
in the same function still read ctx->config->grace_input directly and can
reintroduce the race. Update those additional grace publication sites in the
function that handles hot reloads to use flb_atomic_load on
ctx->config->grace_input before calling flb_supervisor_child_update_grace,
keeping the access pattern consistent everywhere in this flow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant